#==============================================================================#
# 3-merge-in-unions.R
# Matt Curtis 25/08/22
# mjdcurtis@gmail.com
#------------------------------------------------------------------------------#
# recursively merge in geni_unions.csv to get family trees
# updated feb 2 2023
# updated may 29, 2025
#==============================================================================#
rm(list = ls())
gc()
#library(tidyverse)
#library(data.table)
library(stringr)
library(arrow)
library(tidytable)
library(progress)
#------------------------------------------------------------------------------#

# set working directory once
setwd("~/Replication package")

# this file has 6 rounds of merging
# each element of vector is 0/1
# use 1 to run the corresponding round
#------------------------------------------------------------------------------#
# 1. link persons to unions
# this gives us:
#   parent's marriages
#   subject's marriage

print('1. link persons to unions')
persons_bd <-
  read_parquet("./2_scripts/2_0_tempfiles/fr-pids-bd.parquet",
               #integer64="character",
               as_data_frame=FALSE,
               col_select=c(1)) %>% 
  dplyr::compute()
persons_m <-
  read_parquet("./2_scripts/2_0_tempfiles/fr-pids-m.parquet",
        #integer64="character",
        as_data_frame=FALSE,
        col_select=c(14))  %>% 
  dplyr::compute()


persons<-rbind(persons_bd,persons_m) %>% 
  dplyr::compute()
rm(persons_bd,persons_m)
gc()

# split the data in to chunks
chunk_n<-0
max_chunk<-16

while(chunk_n<=max_chunk){
  print(paste0("Reading chunk ",chunk_n+1,"/",max_chunk+1))
  fname<-paste0("./1_raw_data/1_2_geni/1_2_geni_chunked/geni_unions/part-",chunk_n,".parquet")
  unions<-read_parquet(fname,
                     as_data_frame = FALSE,
                     col_select=c(1,2)) 
  gc()
  
  unions<-unions%>% 
    dplyr::inner_join(persons) %>% 
    dplyr::select(union_id) %>% 
    dplyr::compute()
  
  print(paste0("Saving ",nrow(unions),":",chunk_n+1,"/",max_chunk+1))
  write_parquet(unions,paste0("./2_scripts/2_0_tempfiles/fr-unions1_",chunk_n,".parquet"))
  
  # prepare for next round
  chunk_n<-chunk_n+1
}


files<- list.files(pattern = "fr-unions1.*parquet",path="./2_scripts/2_0_tempfiles")
output<-data.table() 
for(file in files){
  print(file)
  temp<-  read_parquet(paste0("./2_scripts/2_0_tempfiles/",file))%>% 
    distinct()
  output<-bind_rows(output,temp)
  rm(temp)
 file.remove(paste0("./2_scripts/2_0_tempfiles/",file))
  gc()
}

# collect data_frame and make distinct 
output<-output%>%
  distinct()
write_parquet(output,"./2_scripts/2_0_tempfiles/fr-union1.parquet")

#------------------------------------------------------------------------------#
# 2. link unions to persons
# this gives us:
# parents
# subject
# siblings
# spouse
# children

print('2. link unions to persons')
previous <-read_parquet("./2_scripts/2_0_tempfiles/fr-union1.parquet",
        as_data_frame = FALSE)
  
  

# split the data in to chunks
chunk_n<-0
max_chunk<-16

while(chunk_n<=max_chunk){
  print(paste0("Reading chunk ",chunk_n+1,"/",max_chunk+1))
  fname<-paste0("./1_raw_data/1_2_geni/1_2_geni_chunked/geni_unions/part-",chunk_n,".parquet")
  unions<-read_parquet(fname,
                       as_data_frame = FALSE,
                       col_select=c(1,2)) 
  gc()
  
  unions<-unions%>% 
    dplyr::inner_join(previous) %>% 
    dplyr::select(profile_id) %>% 
    dplyr::compute()
  
  print(paste0("Saving ",nrow(unions),":",chunk_n+1,"/",max_chunk+1))
  write_parquet(unions,paste0("./2_scripts/2_0_tempfiles/fr-unions2_",chunk_n,".parquet"))
  
  # prepare for next round
  chunk_n<-chunk_n+1
}
  
  

files<- list.files(pattern = "fr-unions2.*parquet",path="./2_scripts/2_0_tempfiles")
output<-data.table() 
for(file in files){
  print(file)
  temp<-  read_parquet(paste0("./2_scripts/2_0_tempfiles/",file))%>% 
    distinct()
  output<-bind_rows(output,temp)
  rm(temp)
  file.remove(paste0("./2_scripts/2_0_tempfiles/",file))
  gc()
}

# collect data_frame and make distinct 
output<-output%>%
  distinct()
write_parquet(output,"./2_scripts/2_0_tempfiles/fr-union2.parquet")



#------------------------------------------------------------------------------#
# 3. link persons to unions
# this gives us:
#  grandparent's marriage
#  parent's marriage
#  subject's marriage
#  sibling's marriage
#  parents in-law's marriage
#  children's marriages
#  (second marriages...)

print(' 3. link persons to unions')
previous <-read_parquet("./2_scripts/2_0_tempfiles/fr-union2.parquet",
                        as_data_frame = FALSE)



# split the data in to chunks
chunk_n<-0
max_chunk<-16

while(chunk_n<=max_chunk){
  print(paste0("Reading chunk ",chunk_n+1,"/",max_chunk+1))
  fname<-paste0("./1_raw_data/1_2_geni/1_2_geni_chunked/geni_unions/part-",chunk_n,".parquet")
  unions<-read_parquet(fname,
                       as_data_frame = FALSE,
                       col_select=c(1,2)) 
  gc()
  
  unions<-unions%>% 
    dplyr::inner_join(previous) %>% 
    dplyr::select(union_id) %>% 
    dplyr::compute()
  
  print(paste0("Saving ",nrow(unions),":",chunk_n+1,"/",max_chunk+1))
  write_parquet(unions,paste0("./2_scripts/2_0_tempfiles/fr-unions3_",chunk_n,".parquet"))
  
  # prepare for next round
  chunk_n<-chunk_n+1
}

files<- list.files(pattern = "fr-unions3.*parquet",path="./2_scripts/2_0_tempfiles")
output<-data.table() 
for(file in files){
  print(file)
  temp<-  read_parquet(paste0("./2_scripts/2_0_tempfiles/",file))%>% 
    distinct()
  output<-bind_rows(output,temp)
  rm(temp)
  file.remove(paste0("./2_scripts/2_0_tempfiles/",file))
  gc()
}

# collect data_frame and make distinct 
output<-output%>%
  distinct()
write_parquet(output,"./2_scripts/2_0_tempfiles/fr-union3.parquet")

#------------------------------------------------------------------------------#
# 4. link unions to persons
# this gives us:
#  grandparents
#  parents
#  subject
#  spouse
#  siblings
#  children
#  sibling's spouses
#  nephews, nieces
#  parents-in-law
#  siblings-in-law
#  grandchildren

print('4. link unions to persons')
previous <-read_parquet("./2_scripts/2_0_tempfiles/fr-union3.parquet",
                        as_data_frame = FALSE)



# split the data in to chunks
chunk_n<-0
max_chunk<-16

while(chunk_n<=max_chunk){
  print(paste0("Reading chunk ",chunk_n+1,"/",max_chunk+1))
  fname<-paste0("./1_raw_data/1_2_geni/1_2_geni_chunked/geni_unions/part-",chunk_n,".parquet")
  unions<-read_parquet(fname,
                       as_data_frame = FALSE,
                       col_select=c(1,2)) 
  gc()
  
  unions<-unions%>% 
    dplyr::inner_join(previous) %>% 
    dplyr::select(profile_id) %>% 
    dplyr::compute()
  
  print(paste0("Saving ",nrow(unions),":",chunk_n+1,"/",max_chunk+1))
  write_parquet(unions,paste0("./2_scripts/2_0_tempfiles/fr-unions4_",chunk_n,".parquet"))
  
  # prepare for next round
  chunk_n<-chunk_n+1
}

files<- list.files(pattern = "fr-unions4.*parquet",path="./2_scripts/2_0_tempfiles")
output<-data.table() 
for(file in files){
  print(file)
  temp<-  read_parquet(paste0("./2_scripts/2_0_tempfiles/",file))%>% 
    distinct()
  output<-bind_rows(output,temp)
  rm(temp)
  file.remove(paste0("./2_scripts/2_0_tempfiles/",file))
  gc()
}

# collect data_frame and make distinct 
output<-output%>%
  distinct()
write_parquet(output,"./2_scripts/2_0_tempfiles/fr-union4.parquet")


#------------------------------------------------------------------------------#
# 5. link persons to unions
# this gives us:
# fill in

print('5. link persons to unions')
previous <-read_parquet("./2_scripts/2_0_tempfiles/fr-union4.parquet",
                        as_data_frame = FALSE)


# split the data in to chunks
chunk_n<-0
max_chunk<-16

while(chunk_n<=max_chunk){
  print(paste0("Reading chunk ",chunk_n+1,"/",max_chunk+1))
  fname<-paste0("./1_raw_data/1_2_geni/1_2_geni_chunked/geni_unions/part-",chunk_n,".parquet")
  unions<-read_parquet(fname,
                       as_data_frame = FALSE,
                       col_select=c(1,2)) 
  gc()
  
  unions<-unions%>% 
    dplyr::inner_join(previous) %>% 
    dplyr::select(union_id) %>% 
    dplyr::compute()
  
  print(paste0("Saving ",nrow(unions),":",chunk_n+1,"/",max_chunk+1))
  write_parquet(unions,paste0("./2_scripts/2_0_tempfiles/fr-unions5_",chunk_n,".parquet"))
  
  # prepare for next round
  chunk_n<-chunk_n+1
}

files<- list.files(pattern = "fr-unions5.*parquet",path="./2_scripts/2_0_tempfiles")
output<-data.table() 
for(file in files){
  print(file)
  temp<-  read_parquet(paste0("./2_scripts/2_0_tempfiles/",file))%>% 
    distinct()
  output<-bind_rows(output,temp)
  rm(temp)
  file.remove(paste0("./2_scripts/2_0_tempfiles/",file))
  gc()
}

# collect data_frame and make distinct 
output<-output%>%
  distinct()
write_parquet(output,"./2_scripts/2_0_tempfiles/fr-union5.parquet")



#------------------------------------------------------------------------------#
# 6. link unions to persons
# this gives us:
# fill in

print('6. link unions to persons')

previous <-read_parquet("./2_scripts/2_0_tempfiles/fr-union4.parquet",
                        as_data_frame = FALSE)


# split the data in to chunks
chunk_n<-0
max_chunk<-16

while(chunk_n<=max_chunk){
  print(paste0("Reading chunk ",chunk_n+1,"/",max_chunk+1))
  fname<-paste0("./1_raw_data/1_2_geni/1_2_geni_chunked/geni_unions/part-",chunk_n,".parquet")
  unions<-read_parquet(fname,
                       as_data_frame = FALSE) 
  gc()
  
  # note: I am keeping all the variables now!
  unions<-unions%>% 
    dplyr::inner_join(previous) %>% 
    dplyr::compute()
  
  print(paste0("Saving ",nrow(unions),":",chunk_n+1,"/",max_chunk+1))
  write_parquet(unions,paste0("./2_scripts/2_0_tempfiles/fr-unions6_",chunk_n,".parquet"))
  
  # prepare for next round
  chunk_n<-chunk_n+1
}

files<- list.files(pattern = "fr-unions6.*parquet",path="./2_scripts/2_0_tempfiles")
output<-data.table() 
for(file in files){
  print(file)
  temp<-  read_parquet(paste0("./2_scripts/2_0_tempfiles/",file))%>% 
    distinct()
  output<-bind_rows(output,temp)
  rm(temp)
  file.remove(paste0("./2_scripts/2_0_tempfiles/",file))
  gc()
}
# collect data_frame and make distinct 
output<-output%>%
  distinct()
write_parquet(output,"./2_scripts/2_0_tempfiles/fr-union6.parquet")
